Confluent & KSQL

1. Confluent简介

1.1 Confluent 背景

LinkedIn的Kafka研发团队,有个三人小组出来创业了。基于这项技术Jay Kreps带头创立了新公司Confluent,Confluent的产品围绕着Kafka做的。 Confluent 致力于为各行各业的公司提供实时数据处理服务的解决方案。该公司已获 LinkedIn 等公司的融资。

1.2 Confluent 定位

Kreps将Kafka描述为LinkedIn的“中枢神经系统”,管理从各个应用程序汇聚到此的信息流,这些数据经过处理后再被分发到各处。

不同于传统的企业信息列队系统,Kafka是以近乎实时的方式处理流经一个公司的所有数据,目前已经为LinkedIn, Netflix, Uber和Verizon建立了实时信息处理平台。

因此,Confluent Platform 是一个流数据平台,能够组织管理来自不同数据源的数据,拥有稳定高效的系统。

Confluent Platform

流式数据处理平台不仅提供了应用系统传输数据的能力,又提供了连接数据源,应用系统,数据管道等所需的全部工具。

Confluent Platform 可以低成本建立实时数据管道和流应用。通过将多个来源和位置的数据集成到一个中央数据流平台,Confluent Platform使您可以专注于如何从数据中获得商业价值而不是担心底层机制,如数据是如何传输或不同系统间的风险,异常。具体来说,Confluent Platform简化了连接数据源到Kafka,用Kafka构建应用程序,以及安全,监控和管理您的Kafka的基础设施。

confluentPlatform3.1

传统架构模式:

platform_chart_updated

Kafka–分布式流数据平台:

streaming_platform_rev

Confluent Platform–在Kafka基础上,更加完善的分布式数据流平台:

confluent_platform_diagram

Confluent Platform:

kafka-enterpise-graphic

总的来说,Confluent Platform平台的组件致力于通过统一而灵活的方式建立一个企业范围的数据流平台。

1.3 Confluent 组件

confluentPackage

Kafka组件:

  • Kafka Java Clients(开源)。 Kafka Clients Java 库,向 kafka 写消息或者从 kafka 读消息。
  • Kafka Streams(开源)。 Kafka Streams是一个流式数据处理库,使kafka成为功能齐全的流处理系统。
  • Kafka Connect(开源)。 一种可扩展的和可靠的连接Kafka框架与外部系统(如数据库,键值存储,搜索索引和文件系统)的框架。

除了Kafka以外, Confluent Platform 包括更多的工具和服务,使构建和管理数据流平台更加容易。

Confluent Platform开源组件:

  • Confluent Kafka Connectors(开源)。Kafka Connectors 中的 Kafka Connector API使得Kafka可与其他数据系统相连接,如Hadoop等。 Confluent 再次基础上完善了更多的connectors,包含当前大多数主流的数据源和数据接收器系统。
  • Confluent Kafka Clients(开源)。其他编程语言的 Kafka Clients 库,包括C/C++,Python,Go,.Net等。
  • Confluent Schema Registry(开源)。Schema 注册服务。确保每一个应用系统使用正确的schema写入数据,确保发送到同一topic的消息格式一致,保证兼容性。
  • Confluent Kafka REST Proxy(开源)。应用系统通过RESTful HTTP服务,和kafka之间发送和接收消息。

Confluent Platform闭源组件:

  • Automatic Data Balancing(闭源)。数据自动负载均衡。
  • Multi-Datacenter Replication(闭源)。多数据中心备份。
  • Confluent Control Center(闭源)。最全面的管理和监控Kafka的GUI系统。
  • JMS Client(闭源)。Kafka JMS 兼容组件。

2. Confluent使用

2.1 Confluent 下载

下载地址:https://www.confluent.io/download/

2.2 Confluent 启动

要求

JAVA >= 1.7

组件配置

默认配置:

Component Default Port
Zookeeper 2181
Apache Kafka brokers (plain text) 9092
Schema Registry REST API 8081
REST Proxy 8082
Kafka Connect REST API 8083
Confluent Control Center 9021

启动

1
confluent start schema-registry

可以看到输出如下的日志:

1
2
3
4
5
6
Starting zookeeper
zookeeper is [UP]
Starting kafka
kafka is [UP]
Starting schema-registry
schema-registry is [UP]

也可单独启动每一个进程:

1
2
3
$ ./bin/zookeeper-server-start ./etc/kafka/zookeeper.properties
$ ./bin/kafka-server-start ./etc/kafka/server.properties
$ ./bin/schema-registry-start ./etc/schema-registry/schema-registry.properties

2.3 Confluent Producer

1
./bin/kafka-avro-console-producer --broker-list localhost:9092 --topic test --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}'

再持续输入待发送的消息

1
2
3
{"f1": "value1"}
{"f1": "value2"}
{"f1": "value3"}

2.4 Confluent Consumer

1
./bin/kafka-avro-console-consumer --topic test --zookeeper localhost:2181 --from-beginning

2.5 错误schema

1
./bin/kafka-avro-console-producer --broker-list localhost:9092 --topic test --property value.schema='{"type":"int"}'

会出现如下的错误:

1
2
3
4
5
6
7
8
9
10
org.apache.kafka.common.errors.SerializationException: Error registering Avro schema: "int"
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema being registered is incompatible with the latest schema; error code: 409
at io.confluent.kafka.schemaregistry.client.rest.utils.RestUtils.httpRequest(RestUtils.java:146)
at io.confluent.kafka.schemaregistry.client.rest.utils.RestUtils.registerSchema(RestUtils.java:174)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.registerAndGetId(CachedSchemaRegistryClient.java:51)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:89)
at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:49)
at io.confluent.kafka.formatter.AvroMessageReader.readMessage(AvroMessageReader.java:155)
at kafka.tools.ConsoleProducer$.main(ConsoleProducer.scala:94)
at kafka.tools.ConsoleProducer.main(ConsoleProducer.scala)

3. KSQL 简介

KSQL 是 Kafka 的一个流式处理引擎。提供了对 Kafka 流处理一个简单的和完全交互式 SQL 接口;无需在使用 java 或 python 编写代码。KSQL 是开源的(Apache 2.0 licensed)、分布式、可扩展的、可靠的、实时的。它支持一系列强大的流处理的操作包括聚合,连接,窗口,会话化,以及其他更多功能。

4. KSQL 编译&启动

4.1 要求

  • KSQL 目前是研发人员预览版,不要在生产环境使用
  • Confluent Platform 3.3.0
  • MAVEN
  • GIT
  • JAVA >= 1.8

4.2 编译

clone

1
$ git clone git@github.com:confluentinc/ksql.git

install

1
2
$ cd ksql
$ mvn clean compile install -DskipTests

4.3 启动

1
$ ./bin/ksql-cli local

成功会如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
                   ======================================
= _ __ _____ ____ _ =
= | |/ // ____|/ __ \| | =
= | ' /| (___ | | | | | =
= | < \___ \| | | | | =
= | . \ ____) | |__| | |____ =
= |_|\_\_____/ \___\_\______| =
= =
= Streaming SQL Engine for Kafka =
Copyright 2017 Confluent Inc.

CLI v0.1, Server v0.1 located at http://localhost:9098

Having trouble? Type 'help' (case-insensitive) for a rundown of how things work!

ksql>

5. KSQL 生产消息

使用 KSQL 提供的 examples 程序,每隔10s,持续地向 pageviews topic 发送一条消息,消息格式为 DELIMITED 格式。

1
$ java -jar ksql-examples/target/ksql-examples-0.1-SNAPSHOT-standalone.jar quickstart=pageviews format=delimited topic=pageviews maxInterval=10000

使用 KSQL 提供的 examples 程序,每隔10s,持续地向 users topic 发送一条消息,消息格式为 JSON 格式。

1
$ java -jar ksql-examples/target/ksql-examples-0.1-SNAPSHOT-standalone.jar quickstart=users format=json topic=users maxInterval=10000

6. KSQL 查询

6.1 创建 STREAM 和 TABLE

STREAM

通过 Kafka topic pageviews,创建一个 STREAM,指定 value_format 格式为 DELIMITED 格式。

1
ksql> CREATE STREAM pageviews_original (viewtime bigint, userid varchar, pageid varchar) WITH (kafka_topic='pageviews', value_format='DELIMITED');

使用 DESCRIBE 命令,查询刚刚新建的 STREAM。可以看到 KSQL 添加了额外的列,列名为 ROWTIME,相当于 Kafka 消息的时间戳;和额外的列,列名为 ROWKEY,相当于 Kafka 消息的key。

1
2
3
4
5
6
7
8
9
ksql> DESCRIBE pageviews_original;

Field | Type
----------------------------
ROWTIME | BIGINT
ROWKEY | VARCHAR(STRING)
VIEWTIME | BIGINT
USERID | VARCHAR(STRING)
PAGEID | VARCHAR(STRING)

TABLE

通过 Kafka topic users,创建一个 TABLE,指定 users_original 格式为 JSON 格式。

1
ksql> CREATE TABLE users_original (registertime bigint, gender varchar, regionid varchar, userid varchar) WITH (kafka_topic='users', value_format='JSON');

使用 DESCRIBE 命令,查询刚刚新建的 TABLE。

1
2
3
4
5
6
7
8
9
10
ksql> DESCRIBE users_original;

Field | Type
--------------------------------
ROWTIME | BIGINT
ROWKEY | VARCHAR(STRING)
REGISTERTIME | BIGINT
GENDER | VARCHAR(STRING)
REGIONID | VARCHAR(STRING)
USERID | VARCHAR(STRING)

SHOW

展示全部 STREAMS 和 TABLES

1
2
3
4
5
6
7
8
9
10
11
ksql> SHOW STREAMS;

Stream Name | Kafka Topic | Format
-----------------------------------------------------------------
PAGEVIEWS_ORIGINAL | pageviews | DELIMITED

ksql> SHOW TABLES;

Table Name | Kafka Topic | Format | Windowed
--------------------------------------------------------------
USERS_ORIGINAL | users | JSON | false

6.2 查询语句

SELECT

使用 SELECT 语句创建一个 query ,并从 STREAM 中返回数据。

1
2
3
4
5
6
7
ksql> SELECT pageid FROM pageviews_original LIMIT 3;
Page_24
Page_73
Page_78
LIMIT reached for the partition.
Query terminated
ksql>

持久化查询

在SELECT语句之前,使用 CREATE STREAM 关键字来创建持久化查询。与刚刚的非持久性的查询相比,持久化查询的结果会写入到一个 Kafka 的 PAGEVIEWS_FEMALE topic中去。如下的查询语句,通过和 users_original TABLE的 userid字段上做 LEFT JOIN操作,使得结果是在 pageviews STREAM 基础上,更加丰富的一种表现。

1
2
3
4
5
6
7
8
9
10
11
ksql> CREATE STREAM pageviews_female AS SELECT users_original.userid AS userid, pageid, regionid, gender FROM pageviews_original LEFT JOIN users_original ON pageviews_original.userid = users_original.userid WHERE gender = 'FEMALE';

ksql> DESCRIBE pageviews_female;
Field | Type
----------------------------
ROWTIME | BIGINT
ROWKEY | VARCHAR(STRING)
USERID | VARCHAR(STRING)
PAGEID | VARCHAR(STRING)
REGIONID | VARCHAR(STRING)
GENDER | VARCHAR(STRING)

SELECT 查询持久化查询语句的结果

1
2
3
4
5
6
7
ksql> SELECT * FROM pageviews_female;
1506483098313 | User_5 | User_5 | Page_31 | Region_9 | FEMALE
1506483117603 | User_9 | User_9 | Page_12 | Region_4 | FEMALE
1506483135819 | User_1 | User_1 | Page_31 | Region_8 | FEMALE
1506483151852 | User_1 | User_1 | Page_26 | Region_8 | FEMALE
1506483152310 | User_8 | User_8 | Page_34 | Region_7 | FEMALE
^CQuery terminated

LIKE 关键字&指定topic

创建新的持久化查询语句,使用 LIKE 关键字。且指定了输出 topic ,该持久化查询语句的结果会写入 Kafka 的 pageviews_enriched_r8_r9 topic中。

1
ksql> CREATE STREAM pageviews_female_like_89 WITH (kafka_topic='pageviews_enriched_r8_r9', value_format='DELIMITED') AS SELECT * FROM pageviews_female WHERE regionid LIKE '%_8' OR regionid LIKE '%_9';

滚动时间窗口&统计

创建持久化查询语句,对 pageviews_female STREAM, 在30s的滚动时间窗口内,进行计数。结果会写入到 PAGEVIEWS_REGIONS topic 中。

1
2
3
4
5
6
7
8
9
10
11
ksql> CREATE TABLE pageviews_regions AS SELECT gender, regionid , COUNT(*) AS numusers FROM pageviews_female WINDOW TUMBLING (size 30 second) GROUP BY gender, regionid HAVING COUNT(*) > 1;

ksql> DESCRIBE pageviews_regions;

Field | Type
----------------------------
ROWTIME | BIGINT
ROWKEY | VARCHAR(STRING)
GENDER | VARCHAR(STRING)
REGIONID | VARCHAR(STRING)
NUMUSERS | BIGINT ·

查询

1
2
3
4
5
6
7
8
ksql> SELECT regionid, numusers FROM pageviews_regions LIMIT 5;
Region_3 | 2
Region_7 | 2
Region_4 | 2
Region_4 | 2
Region_4 | 3
LIMIT reached for the partition.
Query terminated

查看全部持久化查询语句

1
2
3
4
5
6
7
ksql> SHOW QUERIES;

Query ID | Kafka Topic | Query String
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
1 | PAGEVIEWS_FEMALE | CREATE STREAM pageviews_female AS SELECT users_original.userid AS userid, pageid, regionid, gender FROM pageviews_original LEFT JOIN users_original ON pageviews_original.userid = users_original.userid WHERE gender = 'FEMALE';
2 | pageviews_enriched_r8_r9 | CREATE STREAM pageviews_female_like_89 WITH (kafka_topic='pageviews_enriched_r8_r9', value_format='DELIMITED') AS SELECT * FROM pageviews_female WHERE regionid LIKE '%_8' OR regionid LIKE '%_9';
3 | PAGEVIEWS_REGIONS | CREATE TABLE pageviews_regions AS SELECT gender, regionid , COUNT(*) AS numusers FROM pageviews_female WINDOW TUMBLING (size 30 second) GROUP BY gender, regionid HAVING COUNT(*) > 1;

7. KSQL 架构

単例模式:
KSQL-standalone-mode

Client-server模式:
KSQL-client-server

特点:

  • KSQL CLI 是 client 端,是一个控制台。
  • KSQL Engine 是 server 端,负责计算执行。
  • client 与 server 通过 http 连接。
  • Client-server 集群模式中,所有的 servers 共享用户提交的 KSQL 查询语句的计算任务。
  • 支持水平扩展。可动态添加 KSQL server 实例,以提升整体集群的容量和计算能力。
  • 可动态减少 KSQL server 实例,以降低整体集群的容量和计算能力。其余的 KSQL servers 会对集群全部的查询计算任务自动负载均衡。

8. 参考

  1. https://www.confluent.io/
  2. https://github.com/confluentinc/ksql